package top.cyuw.simplerpc.remoting.client;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import top.cyuw.simplerpc.constant.RpcConstants;
import top.cyuw.simplerpc.dto.RpcMessage;
import top.cyuw.simplerpc.dto.RpcResponse;

import java.util.concurrent.CompletableFuture;

/**
 * @author chen
 * @date 2023/3/13 22:20
 */
@Slf4j
public class SimpleRpcClientHandler extends ChannelInboundHandlerAdapter {

    private final FutureManager futureManager;

    public SimpleRpcClientHandler(FutureManager futureManager) {
        this.futureManager = futureManager;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (msg instanceof RpcMessage) {
                RpcMessage rpcMessage = (RpcMessage) msg;
                byte messageType = rpcMessage.getMessageType();
                if (messageType == RpcConstants.MESSAGE_TYPE_RPC_RESPONSE) {
                    RpcResponse rpcResponse = (RpcResponse) rpcMessage.getBody();
                    CompletableFuture<RpcResponse> future = futureManager.remove(rpcResponse.getRequestId());
                    if (future != null) {
                        future.complete(rpcResponse);
                    }
                }
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                RpcMessage rpcMessage = new RpcMessage();
                rpcMessage.setMessageType(RpcConstants.MESSAGE_TYPE_HEARTBEAT_PING);
                ctx.channel().writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("client exception: " + cause.getMessage(), cause);
        ctx.close();
    }
}
